package cn.doitedu.rtdw.data_etl;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/6
 * @Desc: 学大数据，到多易教育
 *  推荐行为概况统计分析，轻度聚合任务
 *
 **/
public class E07_EtlJob_RecommendOverviewAgg {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");
        env.setParallelism(1);

        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);



        // 创建一个逻辑映射表， 读 kafka明细宽表
        tenv.executeSql(
                " CREATE TABLE mall_events_commondim_kfksource(          "
                        + "     user_id           INT,                            "
                        + "     event_id          string,                         "
                        + "     event_time        bigint,                         "
                        + "     device_type       string,                         "
                        + "     properties        map<string,string>,             "
                        + "     page_type         STRING,                         "
                        + "     pt AS proctime()                                  "
                        + " ) WITH (                                             "
                        + "  'connector' = 'kafka',                              "
                        + "  'topic' = 'mall-events-wide',                     "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                        + "  'properties.group.id' = 'testGroup',                "
                        + "  'scan.startup.mode' = 'latest-offset',            "
                        + "  'value.format'='json',                              "
                        + "  'value.json.fail-on-missing-field'='false',         "
                        + "  'value.fields-include' = 'EXCEPT_KEY')              ");


        // 创建一个逻辑映射表， 写入doris物理表
        tenv.executeSql(
                " create table search_agg_01_doris(    "
                        + "     user_id          INT,          "
                        + "     device_type      VARCHAR(20), "
                        + "     page_type        VARCHAR(20),  "
                        + "     rec_region_id    VARCHAR(60),  "
                        + "     rec_region_type  VARCHAR(60),  "
                        + "     rec_model        VARCHAR(60),  "
                        + "     show_count       INT,     "
                        + "     click_count      INT      "
                        + " ) WITH (                               "
                        + "    'connector' = 'doris',              "
                        + "    'fenodes' = 'doitedu:8030',         "
                        + "    'table.identifier' = 'dws.recommend_ana_agg',  "
                        + "    'username' = 'root',                "
                        + "    'password' = '',                    "
                        + "    'sink.label-prefix' = 'doris_tl" + System.currentTimeMillis() + "')");


        // 创建一个逻辑映射表， 用来 lookup 关联hbase中的推荐位信息维表
        tenv.executeSql(
                "CREATE TABLE rec_info_hbase( " +
                        " id STRING, " +
                        " f ROW<rt STRING, rn STRING, md STRING>," +
                        " PRIMARY KEY (id) NOT ENFORCED " +
                        ") WITH (                             " +
                        " 'connector' = 'hbase-2.2',          " +
                        " 'table-name' = 'dim_rec_info',      " +
                        " 'zookeeper.quorum' = 'doitedu:2181' " +
                        ")");


        // 执行一个insert  select
        tenv.executeSql(
                " INSERT INTO search_agg_01_doris                            "
                        +" WITH tmp as (                                              "
                        +" SELECT                                                     "
                        +"  user_id                                                   "
                        +"  ,event_id                                                 "
                        +"  ,event_time                                               "
                        +"  ,device_type                                              "
                        +"  ,properties                                               "
                        +"  ,page_type                                                "
                        +"  ,pt                                                       "
                        +" FROM  mall_events_commondim_kfksource                      "
                        +" WHERE event_id in ('rec_show','rec_click')                 "
                        +" )                                                          "
                        +" SELECT                                                     "
                        +"     user_id                                                "
                        +"     ,device_type                                           "
                        +"     ,page_type                                             "
                        +"     ,tmp.properties['rec_region_id'] AS rec_region_id      "
                        +"     ,r.rt AS rec_region_type                               "
                        +"     ,r.md AS rec_model                                     "
                        +"     ,if(event_id = 'rec_show',1,0) AS show_count           "
                        +"     ,if(event_id = 'rec_click',1,0) AS click_count         "
                        +" FROM tmp                                                   "
                        +" left join rec_info_hbase FOR SYSTEM_TIME AS OF tmp.pt AS r "
                        +" ON tmp.properties['rec_region_id'] = r.id                  "
        );
    }
}
